fix: mcp cancel scope cross task spin (#9068)#9070
Conversation
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/func_tool_manager.py" line_range="675-684" />
<code_context>
+ connect_done = asyncio.Event()
+ connect_error: BaseException | None = None
+
+ async def connect_and_lifecycle() -> None:
+ #Single task that handles connect, lifecycle, and cleanup.
+
+ nonlocal connect_error
+ try:
+ await mcp_client.connect_to_server(cfg, name)
+ await mcp_client.list_tools_and_save()
+ except asyncio.CancelledError:
+ # cleanup on cancellation
+ try:
+ await mcp_client.cleanup()
+ except BaseException:
+ pass
+ raise
+ except Exception as e:
+ connect_error = e
+ try:
+ await mcp_client.cleanup()
+ except Exception:
+ pass
+ connect_done.set()
+ return
+
+ # Register tools
+ self.func_list = [
+ f
</code_context>
<issue_to_address>
**issue (bug_risk):** Tool registration and logging errors won’t propagate via connect_error, leaving the caller to see only a timeout.
Because the `try` ends at `await mcp_client.list_tools_and_save()`, any exception during tool registration or logging won’t set `connect_error` or `connect_done`. The caller will then hit the timeout and raise `MCPInitTimeoutError` instead of the real failure. Please extend the `try` to include the registration/logging block, or add a dedicated `try/except` there that sets `connect_error` and signals `connect_done` on error.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| async def connect_and_lifecycle() -> None: | ||
| #Single task that handles connect, lifecycle, and cleanup. | ||
|
|
||
| nonlocal connect_error | ||
| try: | ||
| await mcp_client.connect_to_server(cfg, name) | ||
| await mcp_client.list_tools_and_save() | ||
| except asyncio.CancelledError: | ||
| # cleanup on cancellation | ||
| try: |
There was a problem hiding this comment.
issue (bug_risk): Tool registration and logging errors won’t propagate via connect_error, leaving the caller to see only a timeout.
Because the try ends at await mcp_client.list_tools_and_save(), any exception during tool registration or logging won’t set connect_error or connect_done. The caller will then hit the timeout and raise MCPInitTimeoutError instead of the real failure. Please extend the try to include the registration/logging block, or add a dedicated try/except there that sets connect_error and signals connect_done on error.
There was a problem hiding this comment.
Code Review
This pull request refactors the MCP server startup and lifecycle management in func_tool_manager.py by consolidating connection, lifecycle, and cleanup logic into a single background task (connect_and_lifecycle). Feedback on these changes highlights two critical concurrency issues: first, a potential leak of the background lifecycle_task and uncleaned runtime state if _start_mcp_server is cancelled while waiting for the connection; second, the risk of leaving the manager in an inconsistent state if the cleanup call in the finally block is interrupted by cancellation. Suggestions were provided to catch asyncio.CancelledError during the connection wait and to wrap the termination cleanup in asyncio.shield.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
92328a1 to
ff99de4
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/func_tool_manager.py" line_range="672" />
<code_context>
+ mcp_client = MCPClient()
+ mcp_client.name = name
+
+ connect_done = asyncio.Event()
+ connect_error: BaseException | None = None
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the `connect_done`/`connect_error` pattern with a single connect `Future` and centralized cleanup to simplify the MCP client startup lifecycle and error handling.
The main added complexity comes from `connect_done` + `connect_error` plus duplicated cleanup inside `connect_and_lifecycle`. You can keep the “single task owns anyio contexts and cleanup” behavior while simplifying the handshake and reusing the existing cleanup helper.
### 1. Replace `connect_done` / `connect_error` with a single `Future`
Use a single `Future` to signal completion/failure of the connect phase, instead of an `Event` + shared exception variable:
```python
mcp_client = MCPClient()
mcp_client.name = name
loop = asyncio.get_running_loop()
connect_future: asyncio.Future[None] = loop.create_future()
async def connect_and_lifecycle() -> None:
try:
await mcp_client.connect_to_server(cfg, name)
await mcp_client.list_tools_and_save()
except asyncio.CancelledError as e:
# Ensure connect_future is completed
if not connect_future.done():
connect_future.set_exception(e)
try:
await self._cleanup_mcp_client_safely(mcp_client, name)
except BaseException:
pass
raise
except Exception as e:
if not connect_future.done():
connect_future.set_exception(e)
try:
await self._cleanup_mcp_client_safely(mcp_client, name)
except BaseException:
pass
return
# Register tools (same as current behavior)
self.func_list = [
f
for f in self.func_list
if not (isinstance(f, MCPTool) and f.mcp_server_name == name)
]
for tool in mcp_client.tools:
func_tool = MCPTool(
mcp_tool=tool,
mcp_client=mcp_client,
mcp_server_name=name,
)
self.func_list.append(func_tool)
logger.info(
f"Connected to MCP server {name}, "
f"Tools: {[t.name for t in mcp_client.tools]}"
)
# Signal successful connect
if not connect_future.done():
connect_future.set_result(None)
try:
await shutdown_event.wait()
logger.info(f"Received shutdown signal for MCP client {name}")
except asyncio.CancelledError:
logger.debug(f"MCP client {name} task was cancelled")
raise
finally:
await asyncio.shield(self._terminate_mcp_client(name))
```
Then the outer timeout / error handling becomes simpler and more idiomatic:
```python
lifecycle_task = asyncio.create_task(
connect_and_lifecycle(), name=f"mcp-client:{name}"
)
async with self._runtime_lock:
self._mcp_server_runtime[name] = _MCPServerRuntime(
name=name,
client=mcp_client,
shutdown_event=shutdown_event,
lifecycle_task=lifecycle_task,
)
self._mcp_starting.discard(name)
try:
await asyncio.wait_for(connect_future, timeout=timeout)
except asyncio.TimeoutError as exc:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raise MCPInitTimeoutError(
f"Connected to MCP server {name} timeout ({timeout:g} seconds)"
) from exc
except asyncio.CancelledError:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raise
except Exception:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raise
```
This keeps:
- A single task owning connection, tool registration, shutdown wait, and `_terminate_mcp_client`.
- Precise timeout on the connect phase.
- Centralized cleanup via `_cleanup_mcp_client_safely` and `_terminate_mcp_client`.
But removes the manual `connect_done` signaling and `connect_error` shared state, making the control flow and error propagation easier to reason about.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| mcp_client = MCPClient() | ||
| mcp_client.name = name | ||
|
|
||
| connect_done = asyncio.Event() |
There was a problem hiding this comment.
issue (complexity): Consider replacing the connect_done/connect_error pattern with a single connect Future and centralized cleanup to simplify the MCP client startup lifecycle and error handling.
The main added complexity comes from connect_done + connect_error plus duplicated cleanup inside connect_and_lifecycle. You can keep the “single task owns anyio contexts and cleanup” behavior while simplifying the handshake and reusing the existing cleanup helper.
1. Replace connect_done / connect_error with a single Future
Use a single Future to signal completion/failure of the connect phase, instead of an Event + shared exception variable:
mcp_client = MCPClient()
mcp_client.name = name
loop = asyncio.get_running_loop()
connect_future: asyncio.Future[None] = loop.create_future()
async def connect_and_lifecycle() -> None:
try:
await mcp_client.connect_to_server(cfg, name)
await mcp_client.list_tools_and_save()
except asyncio.CancelledError as e:
# Ensure connect_future is completed
if not connect_future.done():
connect_future.set_exception(e)
try:
await self._cleanup_mcp_client_safely(mcp_client, name)
except BaseException:
pass
raise
except Exception as e:
if not connect_future.done():
connect_future.set_exception(e)
try:
await self._cleanup_mcp_client_safely(mcp_client, name)
except BaseException:
pass
return
# Register tools (same as current behavior)
self.func_list = [
f
for f in self.func_list
if not (isinstance(f, MCPTool) and f.mcp_server_name == name)
]
for tool in mcp_client.tools:
func_tool = MCPTool(
mcp_tool=tool,
mcp_client=mcp_client,
mcp_server_name=name,
)
self.func_list.append(func_tool)
logger.info(
f"Connected to MCP server {name}, "
f"Tools: {[t.name for t in mcp_client.tools]}"
)
# Signal successful connect
if not connect_future.done():
connect_future.set_result(None)
try:
await shutdown_event.wait()
logger.info(f"Received shutdown signal for MCP client {name}")
except asyncio.CancelledError:
logger.debug(f"MCP client {name} task was cancelled")
raise
finally:
await asyncio.shield(self._terminate_mcp_client(name))Then the outer timeout / error handling becomes simpler and more idiomatic:
lifecycle_task = asyncio.create_task(
connect_and_lifecycle(), name=f"mcp-client:{name}"
)
async with self._runtime_lock:
self._mcp_server_runtime[name] = _MCPServerRuntime(
name=name,
client=mcp_client,
shutdown_event=shutdown_event,
lifecycle_task=lifecycle_task,
)
self._mcp_starting.discard(name)
try:
await asyncio.wait_for(connect_future, timeout=timeout)
except asyncio.TimeoutError as exc:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raise MCPInitTimeoutError(
f"Connected to MCP server {name} timeout ({timeout:g} seconds)"
) from exc
except asyncio.CancelledError:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raise
except Exception:
lifecycle_task.cancel()
await asyncio.gather(lifecycle_task, return_exceptions=True)
async with self._runtime_lock:
self._mcp_starting.discard(name)
self._mcp_server_runtime.pop(name, None)
raiseThis keeps:
- A single task owning connection, tool registration, shutdown wait, and
_terminate_mcp_client. - Precise timeout on the connect phase.
- Centralized cleanup via
_cleanup_mcp_client_safelyand_terminate_mcp_client.
But removes the manual connect_done signaling and connect_error shared state, making the control flow and error propagation easier to reason about.
#9068
修复了
Modifications / 改动点
修复了禁用mcp服务器时由于作用域不匹配出现的处理器空转问题
Screenshots or Test Results / 运行截图或测试结果
禁用一个MCP,在Debug下:
可以注意到不再出现
Error closing current exit stack错误Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Improve MCP client startup and shutdown handling to prevent idle spinning when a server is disabled and ensure proper cleanup on errors, timeouts, and cancellation.
Bug Fixes:
Enhancements: